<?php

namespace WhaleIot\mqtt;

use Exception;
use WhaleIot\mqtt\libs\Message;
use Workerman\Timer;
use Workerman\Worker;
use Workerman\Mqtt\Client;

class BusinessWorker extends Worker
{

    /**
     * 保存用户设置的 worker 启动回调
     *
     * @var callable|null
     */
    protected $_onWorkerStart = null;
    /**
     * 保存用户设置的 workerReload 回调
     *
     * @var callable|null
     */
    protected $_onWorkerReload = null;

    /**
     * 保存用户设置的 workerStop 回调
     *
     * @var callable|null
     */
    protected $_onWorkerStop = null;

    /**
     * Event::onConnect事件
     */
    protected $_eventOnConnect;
    /**
     * Event::onMessage事件
     */
    protected $_eventOnMessage;
    /**
     * Event::onClose事件
     */
    protected $_eventOnClose;
    /**
     * Event::onLos事件
     */
    protected $_eventOnLog;
    /**
     * Event::onError事件
     */
    protected $_eventOnError;
    /**
     * Event::onWorkerStop事件
     */
    protected $_eventOnWorkerStop;
    /**
     * 事件处理类，默认是 Event 类
     * @var string
     */
    public $eventHandler = 'Event';

    /**
     * mqtt地址
     * @var string
     */
    public $mqtt_address = 'mqtt://127.0.0.1:1883';
    /**
     * mqtt参数
     * @var array
     * -username 用户名
     * -password 密码
     * -client_id 连接ID
     * -debug bool 调试模式
     */
    public $mqtt_options = [
        'username'  => 'admin',
        'password'  => '123456',
        'client_id' => 'whale_iot',
        'debug'     => false,
    ];

    /**
     * 订阅主题
     * @var string[]
     */
    public $subscribe = [
        '/v1/device/+/rx' => 0
    ];
    /**
     * redis地址
     * @var string
     */
    public $redis_address = 'redis://127.0.0.1:6379';

    /**
     * redis参数
     * @var array
     */
    public $redis_options = [
        'auth'          => '',//鉴权信息
        'db'            => 0,
        'max_attempts'  => 5,//消费失败后重试次数
        'retry_seconds' => 5//重试时间间隔
    ];
    /**
     * @var string
     */
    public $redis_queue = '';


    /**
     * 构造函数
     *
     * @param string $socket_name
     * @param array $context_option
     */
    public function __construct($socket_name = '', array $context_option = array())
    {
        parent::__construct($socket_name, $context_option);
        $debug_back_race         = debug_backtrace();
        $this->_autoloadRootPath = dirname($debug_back_race[0]['file']);
    }

    /**
     * 注释:运行worker
     * 创建者:JSL
     * 时间:2023/06/24 024 下午 04:03
     * @throws Exception
     */
    public function run()
    {
        $this->_onWorkerStart  = $this->onWorkerStart;
        $this->_onWorkerReload = $this->onWorkerReload;
        $this->_onWorkerStop   = $this->onWorkerStop;
        $this->onWorkerStop    = array($this, 'onWorkerStop');
        $this->onWorkerStart   = array($this, 'onWorkerStart');
        $this->onWorkerReload  = array($this, 'onWorkerReload');
        parent::run();
    }

    protected function onWorkerStart(Worker $worker)
    {
        global $devices;
        if (function_exists('opcache_reset')) {
            opcache_reset();
        }
        if ($this->_onWorkerStart) {
            call_user_func($this->_onWorkerStart, $this);
        }
        if (is_callable($this->eventHandler . '::onWorkerStart')) {
            call_user_func($this->eventHandler . '::onWorkerStart', $this);
        }
        // 设置回调
        if (is_callable($this->eventHandler . '::onConnect')) {
            $this->_eventOnConnect = $this->eventHandler . '::onConnect';
        }

        if (is_callable($this->eventHandler . '::onMessage')) {
            $this->_eventOnMessage = $this->eventHandler . '::onMessage';
        }
        if (is_callable($this->eventHandler . '::onClose')) {
            $this->_eventOnClose = $this->eventHandler . '::onClose';
        }
        if (is_callable($this->eventHandler . '::onLog')) {
            $this->_eventOnLog = $this->eventHandler . '::onLog';
        }
        if (is_callable($this->eventHandler . '::onError')) {
            $this->_eventOnError = $this->eventHandler . '::onError';
        }
        //订阅消息
        try {
            $this->setMqttOption();
            $this->setRedisOption();
            $mqtt            = new Client($this->mqtt_address, $this->mqtt_options);
            $mqtt->onConnect = function ($mqtt) {
                $mqtt->subscribe($this->subscribe);
                $client = new \Workerman\RedisQueue\Client($this->redis_address, $this->redis_options);
                $client->subscribe($this->redis_queue, function ($data) use ($mqtt) {
                    if (isset($data['imei']) && isset($data['cmd']) && isset($data['param'])) {
                        list($msg, $hex, $log, $name) = Message::setDatasource($data);
                        $mqtt->publish('/v1/device/' . $data['imei'] . '/tx', $msg);
                        $logs = [
                            'hex'  => $hex,
                            'log'  => $log,
                            'cmd'  => '0x' . $hex[2],
                            'name' => $name
                        ];
                        if ($this->_eventOnLog) {
                            call_user_func($this->_eventOnLog, $data['imei'], $logs, 'send');
                        }
                    }
                });
            };
            $mqtt->onMessage = function ($topic, $content, $mqtt) use ($devices) {
                global $devices;
                //截取imei
                preg_match('/(device\/)(.*)(?)(\/rx)/', $topic, $result);
                $imei = $result[2];
                //解析报文
                list($hex, $dec) = Message::mimeMessages($content);
                //拼接报文
                if (!Message::spliceMessage($imei, $hex, $dec)) {
                    return;
                }
                //异或校验
                if (!Message::checkXorData($dec)) {
                    return;
                }
                //解构报文
                list($param, $log, $name) = Message::getDatasource($imei, $hex, $dec);
                $data = [
                    'cmd'   => $dec[2],
                    'hex'   => $hex,
                    'dec'   => $dec,
                    'param' => $param,
                ];
                if (!isset($devices[$imei])) {
                    //设备上线通知
                    if ($this->_eventOnConnect) {
                        call_user_func($this->_eventOnConnect, $imei);
                    }
                }
                $devices[$imei] = time();
                //回调方法
                if ($this->_eventOnMessage) {
                    call_user_func($this->_eventOnMessage, $imei, $data);
                }
                $logs = [
                    'hex'  => $hex,
                    'log'  => $log,
                    'cmd'  => '0x' . $hex[2],
                    'name' => $name
                ];
                if ($this->_eventOnLog) {
                    call_user_func($this->_eventOnLog, $imei, $logs, 'receive');
                }
            };
            $mqtt->connect();
            //检测离线设备
            Timer::add(1, function () use ($devices) {
                global $devices;
                if (is_array($devices)) {
                    foreach ($devices as $key => $value) {
                        $time = $value + 300;//5分钟检测心跳时间
                        if ($time < time()) {
                            //通知设备离线
                            if ($this->_eventOnClose) {
                                call_user_func($this->_eventOnClose, $key);
                            }
                            unset($devices[$key]);
                        }
                    }
                }
            });
        } catch (Exception $e) {
            if ($this->_eventOnError) {
                call_user_func($this->_eventOnError, $e);
            }
        }
    }

    /**
     * 注释:设置mqtt参数
     * 创建者:JSL
     * 时间:2023/06/28 028 上午 11:43
     */
    private function setMqttOption()
    {
        if (function_exists('getWhaleIotConfig') && getWhaleIotConfig('whale-iot')) {
            $config             = getWhaleIotConfig('whale-iot.mqtt');
            $this->mqtt_address = 'mqtt://' . ($config['ip'] ?? '127.0.0.1') . ':' . ($config['port'] ?? '1883');
            $this->mqtt_options = [
                'username'  => $config['username'] ?? 'admin',
                'password'  => $config['password'] ?? '123456',
                'client_id' => ($config['client_id'] ?? 'whale_iot') . '_' . mt_rand(),
                'debug'     => $config['debug'] ?? false,
            ];
        } else {
            if (isset($this->mqtt_options['client_id'])) {
                $this->mqtt_options['client_id'] = $this->mqtt_options['client_id'] . '_' . mt_rand();
            }
        }
    }

    /**
     * 注释:设置redis参数
     * 创建者:JSL
     * 时间:2023/06/29 029 下午 02:11
     */
    private function setRedisOption()
    {
        if (function_exists('getWhaleIotConfig') && getWhaleIotConfig('whale-iot')) {
            $config              = getWhaleIotConfig('whale-iot.redis');
            $this->redis_address = 'redis://' . ($config['ip'] ?? '127.0.0.1') . ':' . ($config['port'] ?? '6379');
            $this->redis_options = [
                'auth'          => $config['auth'] ?? '',//鉴权信息
                'db'            => $config['db'] ?? 0,
                'max_attempts'  => $config['max_attempts'] ?? 5,//消费失败后重试次数
                'retry_seconds' => $config['retry_seconds'] ?? 5//重试时间间隔
            ];
            $this->redis_queue   = $config['queue'] ?? 'whale-iot/mqtt/publish';
        }
    }

    /**
     * onWorkerReload 回调
     * @param Worker $worker
     */
    protected function onWorkerReload(Worker $worker)
    {
        // 防止进程立刻退出
        $worker->reloadable = false;
        // 延迟 0.05 秒退出，避免 Worker 瞬间全部退出导致没有可用的 Worker 进程
        Timer::add(0.05, array('Workerman\Worker', 'stopAll'));
        // 执行用户定义的 onWorkerReload 回调
        if ($this->_onWorkerReload) {
            call_user_func($this->_onWorkerReload, $this);
        }
    }

    /**
     * 当进程关闭时一些清理工作
     *
     * @return void
     */
    protected function onWorkerStop()
    {
        global $devices;
        $list = [];
        if (is_array($devices)) {
            foreach ($devices as $key => $value) {
                $list[] = $key;
            }
        }
        if ($this->_onWorkerStop) {
            call_user_func($this->_onWorkerStop, $this);
        }
        if (is_callable($this->eventHandler . '::onWorkerStop')) {
            call_user_func($this->eventHandler . '::onWorkerStop', $list);
        }
    }


}